{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Developing, Training, and Deploying a TensorFlow model on Google Cloud Platform (completely within Jupyter)\n",
    "\n",
    "In this notebook, we will develop a Keras model to predict flight delays using TensorFlow 2.0 as the backend."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Make sure you are using compatible pip and TensorFlow 2.0\n",
    "\n",
    "You need pip 19.1 or higher and TensorFlow 2.1 and Python 3.7+. If not, run the necessary update scripts and check the versions again."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip --version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import sys\n",
    "print(tf.version.VERSION)\n",
    "print('{}.{}'.format(sys.version_info.major,sys.version_info.minor))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#%pip install --user --upgrade --quiet pip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#%pip install --user --upgrade --quiet tf_nightly-2.0-preview"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# change these to try this notebook out\n",
    "# In \"production\", these will be replaced by the parameters passed to papermill\n",
    "BUCKET = 'cloud-training-demos-ml'\n",
    "PROJECT = 'cloud-training-demos'\n",
    "REGION = 'us-central1'\n",
    "DEVELOP_MODE = True\n",
    "NBUCKETS = 5 # for embeddings\n",
    "NUM_EXAMPLES = 1000*1000 # assume 1 million examples\n",
    "TRAIN_BATCH_SIZE = 64\n",
    "DNN_HIDDEN_UNITS = '64,32'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['BUCKET'] = BUCKET\n",
    "os.environ['PROJECT'] = PROJECT\n",
    "os.environ['REGION'] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "gcloud config set project $PROJECT\n",
    "gcloud config set compute/region $REGION"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating the input data pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "DATA_BUCKET = \"gs://{}/flights/chapter8/output/\".format(BUCKET)\n",
    "TRAIN_DATA_PATTERN = DATA_BUCKET + \"train*\"\n",
    "EVAL_DATA_PATTERN = DATA_BUCKET + \"test*\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud storage ls $DATA_BUCKET"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Use tf.data to read the CSV files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os, json, math, shutil\n",
    "import numpy as np\n",
    "import tensorflow as tf\n",
    "print(\"Tensorflow version \" + tf.__version__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "CSV_COLUMNS  = ('ontime,dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + \\\n",
    "                ',carrier,dep_lat,dep_lon,arr_lat,arr_lon,origin,dest').split(',')\n",
    "LABEL_COLUMN = 'ontime'\n",
    "DEFAULTS     = [[0.0],[0.0],[0.0],[0.0],[0.0],[0.0],\\\n",
    "                ['na'],[0.0],[0.0],[0.0],[0.0],['na'],['na']]\n",
    "\n",
    "def load_dataset(pattern, batch_size=1):\n",
    "  return tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if DEVELOP_MODE:\n",
    "    dataset = load_dataset(TRAIN_DATA_PATTERN)\n",
    "    for n, data in enumerate(dataset):\n",
    "        numpy_data = {k: v.numpy() for k, v in data.items()} # .numpy() works only in eager mode\n",
    "        print(numpy_data)\n",
    "        if n>3: break"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def features_and_labels(features):\n",
    "  label = features.pop('ontime') # this is what we will train for\n",
    "  return features, label\n",
    "\n",
    "def read_dataset(pattern, batch_size, mode=tf.estimator.ModeKeys.TRAIN, truncate=None):\n",
    "  dataset = load_dataset(pattern, batch_size)\n",
    "  dataset = dataset.map(features_and_labels)\n",
    "  if mode == tf.estimator.ModeKeys.TRAIN:\n",
    "    dataset = dataset.shuffle(batch_size*10)\n",
    "    dataset = dataset.repeat()\n",
    "  dataset = dataset.prefetch(1)\n",
    "  if truncate is not None:\n",
    "    dataset = dataset.take(truncate)\n",
    "  return dataset\n",
    "\n",
    "\n",
    "if DEVELOP_MODE:\n",
    "    print(\"Checking input pipeline\")\n",
    "    one_item = read_dataset(TRAIN_DATA_PATTERN, batch_size=2, truncate=1)\n",
    "    print(list(one_item)) # should print one batch of 2 items"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create TensorFlow wide-and-deep model\n",
    "\n",
    "We'll create feature columns, and do some discretization and feature engineering.\n",
    "See the book for details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "real = {\n",
    "    colname : tf.feature_column.numeric_column(colname) \n",
    "          for colname in \n",
    "            ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' +\n",
    "             ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')\n",
    "}\n",
    "sparse = {\n",
    "      'carrier': tf.feature_column.categorical_column_with_vocabulary_list('carrier',\n",
    "                  vocabulary_list='AS,VX,F9,UA,US,WN,HA,EV,MQ,DL,OO,B6,NK,AA'.split(',')),\n",
    "      'origin' : tf.feature_column.categorical_column_with_hash_bucket('origin', hash_bucket_size=1000),\n",
    "      'dest'   : tf.feature_column.categorical_column_with_hash_bucket('dest', hash_bucket_size=1000)\n",
    "}\n",
    "\n",
    "inputs = {\n",
    "    colname : tf.keras.layers.Input(name=colname, shape=(), dtype='float32') \n",
    "          for colname in real.keys()\n",
    "}\n",
    "inputs.update({\n",
    "    colname : tf.keras.layers.Input(name=colname, shape=(), dtype='string') \n",
    "          for colname in sparse.keys()\n",
    "})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Feature engineering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "latbuckets = np.linspace(20.0, 50.0, NBUCKETS).tolist()  # USA\n",
    "lonbuckets = np.linspace(-120.0, -70.0, NBUCKETS).tolist() # USA\n",
    "disc = {}\n",
    "disc.update({\n",
    "       'd_{}'.format(key) : tf.feature_column.bucketized_column(real[key], latbuckets) \n",
    "          for key in ['dep_lat', 'arr_lat']\n",
    "})\n",
    "disc.update({\n",
    "       'd_{}'.format(key) : tf.feature_column.bucketized_column(real[key], lonbuckets) \n",
    "          for key in ['dep_lon', 'arr_lon']\n",
    "})\n",
    "\n",
    "# cross columns that make sense in combination\n",
    "sparse['dep_loc'] = tf.feature_column.crossed_column([disc['d_dep_lat'], disc['d_dep_lon']], NBUCKETS*NBUCKETS)\n",
    "sparse['arr_loc'] = tf.feature_column.crossed_column([disc['d_arr_lat'], disc['d_arr_lon']], NBUCKETS*NBUCKETS)\n",
    "sparse['dep_arr'] = tf.feature_column.crossed_column([sparse['dep_loc'], sparse['arr_loc']], NBUCKETS ** 4)\n",
    "#sparse['ori_dest'] = tf.feature_column.crossed_column(['origin', 'dest'], hash_bucket_size=1000)\n",
    "\n",
    "# embed all the sparse columns\n",
    "embed = {\n",
    "       'embed_{}'.format(colname) : tf.feature_column.embedding_column(col, 10)\n",
    "          for colname, col in sparse.items()\n",
    "}\n",
    "real.update(embed)\n",
    "\n",
    "# one-hot encode the sparse columns\n",
    "sparse = {\n",
    "    colname : tf.feature_column.indicator_column(col)\n",
    "          for colname, col in sparse.items()\n",
    "}\n",
    "\n",
    "if DEVELOP_MODE:\n",
    "    print(sparse.keys())\n",
    "    print(real.keys())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train the model and evaluate once in a while\n",
    "\n",
    "Also checkpoint"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_dir='gs://{}/flights/trained_model'.format(BUCKET)\n",
    "os.environ['OUTDIR'] = output_dir  # needed for deployment\n",
    "print('Writing trained model to {}'.format(output_dir))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud storage rm --recursive --continue-on-error $OUTDIR"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Build a wide-and-deep model.\n",
    "def wide_and_deep_classifier(inputs, linear_feature_columns, dnn_feature_columns, dnn_hidden_units):\n",
    "    deep = tf.keras.layers.DenseFeatures(dnn_feature_columns, name='deep_inputs')(inputs)\n",
    "    layers = [int(x) for x in dnn_hidden_units.split(',')]\n",
    "    for layerno, numnodes in enumerate(layers):\n",
    "        deep = tf.keras.layers.Dense(numnodes, activation='relu', name='dnn_{}'.format(layerno+1))(deep)        \n",
    "    wide = tf.keras.layers.DenseFeatures(linear_feature_columns, name='wide_inputs')(inputs)\n",
    "    both = tf.keras.layers.concatenate([deep, wide], name='both')\n",
    "    output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)\n",
    "    model = tf.keras.Model(inputs, output)\n",
    "    model.compile(optimizer='adam',\n",
    "                  loss='binary_crossentropy',\n",
    "                  metrics=['accuracy'])\n",
    "    return model\n",
    "    \n",
    "model = wide_and_deep_classifier(\n",
    "    inputs,\n",
    "    linear_feature_columns = sparse.values(),\n",
    "    dnn_feature_columns = real.values(),\n",
    "    dnn_hidden_units = DNN_HIDDEN_UNITS)\n",
    "tf.keras.utils.plot_model(model, 'flights_model.png', show_shapes=False, rankdir='LR')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You should see:\n",
    "<img src=\"flights_model.png\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# training and evaluation dataset\n",
    "train_batch_size = TRAIN_BATCH_SIZE\n",
    "if DEVELOP_MODE:\n",
    "    eval_batch_size = 100\n",
    "    steps_per_epoch = 3\n",
    "    epochs = 2\n",
    "else:\n",
    "    eval_batch_size = 10000\n",
    "    steps_per_epoch = NUM_EXAMPLES // train_batch_size\n",
    "    epochs = 10\n",
    "train_dataset = read_dataset(TRAIN_DATA_PATTERN, train_batch_size)\n",
    "eval_dataset = read_dataset(EVAL_DATA_PATTERN, eval_batch_size, tf.estimator.ModeKeys.EVAL, eval_batch_size*10)\n",
    "\n",
    "checkpoint_path = '{}/checkpoints/flights.cpt'.format(output_dir)\n",
    "shutil.rmtree(checkpoint_path, ignore_errors=True)\n",
    "cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, \n",
    "                                                 save_weights_only=True,\n",
    "                                                 verbose=1)\n",
    "\n",
    "history = model.fit(train_dataset, \n",
    "                    validation_data=eval_dataset,\n",
    "                    epochs=epochs, \n",
    "                    steps_per_epoch=steps_per_epoch,\n",
    "                    callbacks=[cp_callback])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(history.history.keys())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import matplotlib.pyplot as plt\n",
    "nrows = 1\n",
    "ncols = 2\n",
    "fig = plt.figure(figsize=(10, 5))\n",
    "\n",
    "for idx, key in enumerate(['loss', 'accuracy']):\n",
    "    ax = fig.add_subplot(nrows, ncols, idx+1)\n",
    "    plt.plot(history.history[key])\n",
    "    plt.plot(history.history['val_{}'.format(key)])\n",
    "    plt.title('model {}'.format(key))\n",
    "    plt.ylabel(key)\n",
    "    plt.xlabel('epoch')\n",
    "    plt.legend(['train', 'validation'], loc='upper left');"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Export and deploy the trained model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "export_dir = '{}/export/flights_{}'.format(output_dir, time.strftime(\"%Y%m%d-%H%M%S\"))\n",
    "print('Exporting to {}'.format(export_dir))\n",
    "tf.saved_model.save(model, export_dir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "model_dir=$(gcloud storage ls ${OUTDIR}/export | tail -1)\n",
    "echo $model_dir\n",
    "saved_model_cli show --tag_set serve --signature_def serving_default --dir $model_dir"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "PROJECT=cloud-training-demos\n",
    "BUCKET=${PROJECT}-ml\n",
    "REGION=us-east1\n",
    "MODEL_NAME=flights\n",
    "VERSION_NAME=tf2\n",
    "EXPORT_PATH=$(gcloud storage ls ${OUTDIR}/export | tail -1)\n",
    "\n",
    "if [[ $(gcloud ai-platform models list --format='value(name)' | grep $MODEL_NAME) ]]; then\n",
    "    echo \"$MODEL_NAME already exists\"\n",
    "else\n",
    "    # create model\n",
    "    echo \"Creating $MODEL_NAME\"\n",
    "    gcloud ai-platform models create --regions=$REGION $MODEL_NAME\n",
    "fi\n",
    "\n",
    "if [[ $(gcloud ai-platform versions list --model $MODEL_NAME --format='value(name)' | grep $VERSION_NAME) ]]; then\n",
    "    echo \"Deleting already existing $MODEL_NAME:$VERSION_NAME ... \"\n",
    "    gcloud ai-platform versions delete --model=$MODEL_NAME $VERSION_NAME\n",
    "    echo \"Please run this cell again if you don't see a Creating message ... \"\n",
    "    sleep 10\n",
    "fi\n",
    "\n",
    "# create model\n",
    "echo \"Creating $MODEL_NAME:$VERSION_NAME\"\n",
    "gcloud ai-platform versions create --model=$MODEL_NAME $VERSION_NAME --async \\\n",
    "       --framework=tensorflow --python-version=3.7 --runtime-version=2.1 \\\n",
    "       --origin=$EXPORT_PATH --staging-bucket=gs://$BUCKET"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Wait for a couple of minutes for model to get deployed (monitor https://console.cloud.google.com/ai-platform/models/flights/versions), and then try invoking it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud ai-platform predict --model=flights --version=tf2 --json-instances=example_input.json"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2016-2020 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "celltoolbar": "Tags",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
